Splunk® Data Stream Processor

Connect to Data Sources and Destinations with DSP

On April 3, 2023, Splunk Data Stream Processor reached its end of sale, and will reach its end of life on February 28, 2025. If you are an existing DSP customer, please reach out to your account team for more information.

All DSP releases prior to DSP 1.4.0 use Gravity, a Kubernetes orchestrator, which has been announced end-of-life. We have replaced Gravity with an alternative component in DSP 1.4.0. Therefore, we will no longer provide support for versions of DSP prior to DSP 1.4.0 after July 1, 2023. We advise all of our customers to upgrade to DSP 1.4.0 in order to continue to receive full product support from Splunk.

Formatting DSP data for Parquet files in Amazon S3

When you send data from the to an Amazon S3 bucket, you can choose to store the data in Parquet format. However, when the records in the pipeline are converted to Parquet format, any data that is treated as a union of multiple data types instead of being explicitly cast to a specific data type becomes nested one level deeper under container fields. These container fields use generic names such as member0, member1, and so on. As a result, union-typed data becomes difficult to retrieve from the Parquet files.

To prevent important data from becoming obscured this way, extract relevant data from union-typed fields into top-level fields that are cast to a data type other than union. For more information about union-typed data, see data types in the Use the Data Stream Processor manual. For information about the functions that you can use to extract and cast your data, see the Evaluation scalar functions chapter in the Function Reference manual.

Guidelines for formatting union-typed data for Parquet output

The exact data transformations that you need to include in your pipeline vary depending on the specific data that you are working with. The following are some general guidelines for how to check and format your data:

  • If your pipeline is streaming records that use the standard DSP event or metrics schemas, check the body and attributes fields for data that needs to be extracted. The body field is a union-typed field, and the attributes field is a map of string type to union type (map<string, union<...>>).
    • To format data from the body field, start by casting the field to a basic data type such as string. Casting the field to a data type other than union allows it to be used as input in streaming functions. Then, extract the relevant values from the body field into top-level fields. If necessary, cast the resulting top-level fields to other appropriate data types. See Example 1: Formatting body data for Parquet.
    • To format data from the attributes field, use dot notation to select relevant values from the map and promote them into top-level fields. If necessary, cast the resulting top-level fields to other appropriate data types. See Example 2: Formatting attributes data for Parquet.
  • You can confirm the data types of your fields by selecting the sink function in your pipeline, then selecting the View Configurations tab, and then checking the list of fields under Input Fields. The names of union-typed fields have a pair of angled brackets ( <> ) displayed beside them.

    Union-typed data can be contained by hierarchical types such as map or list. In such cases, the symbol for the hierarchical type is shown instead. For example, a map field is represented by the map symbol ( {} ) even if it's a map of string type to union type.

Example 1: Formatting body data for Parquet

Assume that records with the following body content are streaming through your pipeline:

Record 1:

body: "region:us-west-1,userid:user1"

Record 2:

body: "region:us-west-2,userid:user2"

The body field is a union-typed field. If you send these records as is to Amazon S3 using Parquet format, the data from body gets nested under a container field with a generic name like member0. To retrieve that data afterwards, you would need to know that the body data got nested under member0, and then run queries on body.member0.

You can make the Parquet-formatted data easier to work with by transforming those records in the pipeline before they reach the Send to Amazon S3 sink function:

  1. Cast the body field from union to string so that the field can be used as input in streaming functions. Then, extract the key-value pairs from the body string into a map field named extracted_body:
    1. Add an Eval function to your pipeline.
    2. On the View Configurations tab, enter the following expression in the Function field:
      extracted_body=extract_key_value(cast(body, "string"), ":", ",")
      
    The resulting extracted_body fields look like this:

    Record 1:

    extracted_body: {
         "region": "us-west-1",
         "userid": "user1"
         }

    Record 2:

    extracted_body: {
         "region": "us-west-2",
         "userid": "user2"
         }
  2. Promote the region key from the extracted_body field into a top-level string field named Region.
    1. Add an Eval function to your pipeline. Make sure to place this Eval function somewhere after the one that you added during step 1.
    2. On the View Configurations tab, enter the following expression in the Function field:
      Region=ucast(extracted_body.region, "string", null)
      
  3. Promote the userid key from the extracted_body field into a top-level string field named UserID.
    1. Add an Eval function to your pipeline. Make sure to place this Eval function somewhere after the one that you added during step 1.
    2. On the View Configurations tab, enter the following expression in the Function field:
      UserID=ucast(extracted_body.userid, "string", null)
      
  4. Now that you have extracted the relevant data from the body field, drop it from your records. You can also drop the extracted_body field, which was only used earlier in the pipeline to support the extraction and casting of the body data.
    1. Add a Fields function to your pipeline. Make sure to place this Fields function somewhere after all the Eval functions that you added during the previous steps.
    2. On the View Configurations tab, in the Field list field, enter body.
    3. Click Add, and in the newly added field, enter extracted_body.
    4. In the Operator field, enter a minus sign ( - ).

The following SPL2 statement shows what a complete pipeline containing these transformations looks like:

| from splunk_firehose() 
| eval extracted_body=extract_key_value(cast(body, "string"), ":", ",") 
| eval Region=ucast(extracted_body.region, "string", null) 
| eval UserID=ucast(extracted_body.userid, "string", null) 
| fields - body, extracted_body
| into into_s3("0d5690ec-7da0-446f-b2b8-ccc18e84a634", "my-bucket", "#{datetime:yyyy-MM}", 0, 0, "Parquet", allow_dropping_events:false);

When the transformed records are converted to Parquet format, Region and UserID remain as top-level fields. You can then retrieve the data by querying Region and UserID directly.

Example 2: Formatting attributes data for Parquet

Assume that records with the following attributes content are streaming through your pipeline:

Record 1:

attributes: {
     "purchase_status": "complete",
     "item_count": "3"
     }

Record 2:

attributes: {
     "purchase_status": "pending",
     "item_count": "5"
     }

The attributes field is a map of string type to union type (map<string, union<...>>). If you send these records as is to Amazon S3 using Parquet format, the data from the attributes field gets nested under container fields with generic names like member0, member1, and so on. To retrieve that data afterwards, you would need to know which container fields purchase_status and item_count got nested under, and run queries on entities like attributes.member0 and attributes.member1.

You can make the Parquet-formatted data easier to work with by transforming those records in the pipeline before they reach the Send to Amazon S3 sink function:

  1. Extract the purchase_status key from the attributes field into a top-level string field named PurchaseStatus.
    1. Add an Eval function to your pipeline.
    2. On the View Configurations tab, enter the following expression in the Function field:
      PurchaseStatus=ucast(attributes.purchase_status, "string", null)
      
  2. Extract the item_count key from the attributes field into a top-level string field named ItemCount.
    1. Add an Eval function to your pipeline.
    2. On the View Configurations tab, enter the following expression in the Function field:
      ItemCount=ucast(attributes.item_count), "integer", null)
      
  3. Now that you have extracted the relevant data from the attributes field, drop it from your records.
    1. Add a Fields function to your pipeline. Make sure to place this Fields function somewhere after all the Eval functions that you added during the previous steps.
    2. On the View Configurations tab, in the Field list field, enter attributes.
    3. In the Operator field, enter a minus sign ( - ).

The following SPL2 statement shows what a complete pipeline containing these transformations looks like:

| from splunk_firehose() 
| eval PurchaseStatus=ucast(attributes.purchase_status, "string", null) 
| eval ItemCount=ucast(attributes.item_count, "integer", null) 
| fields - attributes
| into into_s3("0d5690ec-7da0-446f-b2b8-ccc18e84a634", "my-bucket", "#{datetime:yyyy-MM}", 0, 0, "Parquet", allow_dropping_events:false);

When the transformed records are converted to Parquet format, PurchaseStatus and ItemCount remain as top-level fields. You can then retrieve the data by querying PurchaseStatus and ItemCount directly.

See also

Functions
Eval
Casting
Fields
Send data to Amazon S3
String manipulation
Related topics
Accessing map elements using dot notation
Last modified on 22 March, 2022
Create a connection to send data to Amazon S3   Connecting Kafka to your DSP pipeline as a data source

This documentation applies to the following versions of Splunk® Data Stream Processor: 1.3.0, 1.3.1, 1.4.0, 1.4.1, 1.4.2, 1.4.3, 1.4.4, 1.4.5, 1.4.6


Was this topic useful?







You must be logged into splunk.com in order to post comments. Log in now.

Please try to keep this discussion focused on the content covered in this documentation topic. If you have a more general question about Splunk functionality or are experiencing a difficulty with Splunk, consider posting a question to Splunkbase Answers.

0 out of 1000 Characters